#include "config.h"

#include <sys/types.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "ynet_rpc.h"
#include "net_global.h"
#include "job_dock.h"
#include "longtask.h"
#include "cluster.h"
#include "rpc_proto.h"
#include "cluster_rpc.h"
#include "dbg.h"

typedef enum {
        CLUSTER_RPC_NULL = 0,
        CLUSTER_RPC_GETINFO,
        CLUSTER_RPC_ANALYSIS,
        CLUSTER_RPC_PING,
        CLUSTER_RPC_MAX,
} cluster_rpc_op_t;

typedef struct {
        uint32_t op;
        char buf[0];
} msg_t;

static __request_handler_func__  __request_handler__[CLUSTER_RPC_MAX - CLUSTER_RPC_NULL];
static char  __request_name__[CLUSTER_RPC_MAX - CLUSTER_RPC_NULL][__RPC_HANDLER_NAME__ ];

static void __request_set_handler(int op, __request_handler_func__ func, const char *name)
{
        YASSERT(strlen(name) + 1 < __RPC_HANDLER_NAME__ );
        strcpy(__request_name__[op - CLUSTER_RPC_NULL], name);
        __request_handler__[op - CLUSTER_RPC_NULL] = func;
}

static void __request_get_handler(int op, __request_handler_func__ *func, char *name)
{
        *func = __request_handler__[op - CLUSTER_RPC_NULL];
        strcpy(name, __request_name__[op - CLUSTER_RPC_NULL]);
}

static void __getmsg(buffer_t *buf, msg_t **_req, int *buflen, char *_buf)
{
        msg_t *req;

        YASSERT(buf->len <= MEM_CACHE_SIZE4K);

        req = (void *)_buf;
        *buflen = buf->len - sizeof(*req);
        mbuffer_get(buf, req, buf->len);

        *_req = req;
}

static int __cluster_srv_analysis(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const str_t *key;
        char buf[MAX_BUF_LEN], buf1[MAX_BUF_LEN], value[MAX_NAME_LEN];
        const char *p;
        str_t *rep1;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &key, NULL, NULL);

        rep1 = (void *)buf1;
        rep1->len = 0;
        str_for_each(key, p) {
                ret = analysis_dump("analysis.jobdock", p, value);
                if (unlikely(ret)) {
                        str_append(rep1, "not available");
                        DBUG("%s %s\n", p, rep1->buf);
                        continue;
                }

                DBUG("%s : %s\n", p, value);
                str_append(rep1, value);
        }


        rpc_reply(sockid, msgid, rep1, rep1->len);

        return 0;
}

static int __cluster_srv_getinfo(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const nid_t *peer;
        char buf[MAX_BUF_LEN], info[MAX_BUF_LEN];
        uint32_t infolen;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &peer, NULL, NULL);

        infolen = MAX_NAME_LEN;
        ret = netable_getinfo(peer, (void *)info, &infolen);
        if (unlikely(ret)) {
                DWARN("query %s ret %d\n",
                      network_rname(peer), ret);
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, info, infolen);

        return 0;
err_ret:
        return ret;
}

static int __cluster_srv_ping(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        (void) _buf;

        rpc_reply(sockid, msgid, NULL, 0);

        return 0;
}

int cluster_rpc_getinfo(const nid_t *nid, const nid_t *peer, ynet_net_info_t *info)
{
        int ret;
        char buf[MAX_BUF_LEN];
        uint32_t count;
        msg_t *req;

        YASSERT(net_isnull(nid) == 0);

        req = (void *)buf;
        req->op = CLUSTER_RPC_GETINFO;
        _opaque_encode(req->buf, &count, peer, sizeof(*peer), NULL);

        ret = rpc_request_wait("cluster_getinfo", nid,
                               req, sizeof(*req) + count,
                               info, NULL,
                               MSG_CLUSTER, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int cluster_rpc_ping(const nid_t *nid, int timeout)
{
        int ret, count;
        msg_t *req;
        char buf[MAX_BUF_LEN];

        YASSERT(net_isnull(nid) == 0);

        req = (void *)buf;
        req->op = CLUSTER_RPC_PING;
        count = 0;

        ret = rpc_request_wait("cluster_ping", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_CLUSTER, 0, timeout);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int cluster_rpc_analysis(const nid_t *nid, const str_t *key, str_t *value)
{
        int ret;
        msg_t *req;
        char buf[MAX_BUF_LEN];
        uint32_t count;

        YASSERT(net_isnull(nid) == 0);

        req = (void *)buf;
        req->op = CLUSTER_RPC_ANALYSIS;
        _opaque_encode(req->buf, &count, key, key->len + 1, NULL);

        ret = rpc_request_wait("cluster_analysis", nid,
                               req, sizeof(*req) + count,
                               value, NULL,
                               MSG_CLUSTER, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __request_handler(void *arg)
{
        int ret;
        msg_t req;
        sockid_t sockid;
        msgid_t msgid;
        buffer_t buf;
        __request_handler_func__ handler;
        char name[MAX_NAME_LEN];

        request_trans(arg, &sockid, &msgid, &buf, NULL);

        if (buf.len < sizeof(req)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        mbuffer_get(&buf, &req, sizeof(req));

        DBUG("new job op %u\n", req.op);

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        __request_get_handler(req.op, &handler, name);
        if (handler == NULL) {
                ret = ENOSYS;
                DWARN("error op %u\n", req.op);
                GOTO(err_ret, ret);
        }

        schedule_task_setname(name);

        ret = handler(&sockid, &msgid, &buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&buf);

        return ;
err_ret:
        mbuffer_free(&buf);
        rpc_reply_error(&sockid, &msgid, ret);
        return;
}

int cluster_rpc_init()
{
        DINFO("cluster rpc init\n"); 

        __request_set_handler(CLUSTER_RPC_ANALYSIS, __cluster_srv_analysis, "cluster_srv_analysis");
        __request_set_handler(CLUSTER_RPC_GETINFO, __cluster_srv_getinfo, "cluster_srv_getinfo");
        __request_set_handler(CLUSTER_RPC_PING, __cluster_srv_ping, "cluster_srv_ping");

        rpc_request_register(MSG_CLUSTER, __request_handler, NULL);

        return 0;
}
